package com.jplus.plugins.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * ActiveMq操作工具类
 * 
 * @author Yuanqy
 *
 */
public class ActiveMqTemplete {
	private final Logger logger = LoggerFactory.getLogger(getClass());
	private String address;
	private String queueName;
	ThreadLocal<Connection> connection = new ThreadLocal<Connection>();

	// == 构造方法 ==========================
	public ActiveMqTemplete() {
		this.address = "tcp://localhost:61616";
		this.queueName = "TestQueue";
	}

	public ActiveMqTemplete(String address, String queueName) {
		this.address = address;
		this.queueName = queueName;
	}

	// == 获取连接 ===========================
	private synchronized void getConnection() {
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, address);
		try {
			connection.set(connectionFactory.createConnection());
			connection.get().start();
			logger.info("获取Connection");
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	// == 发送消息 ============================
	public void sendMessage(String msg) {
		Session session = null;
		try {
			getConnection();
			session = connection.get().createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
			Destination destination = session.createQueue(queueName);
			MessageProducer producer = session.createProducer(destination);
			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
			TextMessage message = session.createTextMessage(msg);
			producer.send(message);
			commit(session);
			logger.info("jmsSendMessage:" + msg);
		} catch (Exception e) {
			rollback(session);
			e.printStackTrace();
		} finally {
			close();
		}
	}

	// == 开启消息监听 =========================

	public void startMsgListener(IMsgHandler handler, int queuePrefetch) {
		Session session = null;
		try {
			getConnection();
			// activeMQ预取策略
			ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
			prefetchPolicy.setQueuePrefetch(queuePrefetch);
			((ActiveMQConnection) connection.get()).setPrefetchPolicy(prefetchPolicy);
			connection.get().start();
			// 会话采用非事务级别，消息到达机制使用自动通知机制
			session = connection.get().createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
			Destination destination = session.createQueue(this.queueName);
			MessageConsumer consumer = session.createConsumer(destination);
			consumer.setMessageListener(new CustomMessageListener(handler));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public interface IMsgHandler {
		void receiveMsg(TextMessage textMsg);
	}

	public class CustomMessageListener implements MessageListener {
		private final Logger logger = LoggerFactory.getLogger(getClass());
		private IMsgHandler msgHandler;

		public CustomMessageListener(IMsgHandler msgHandler) {
			super();
			this.msgHandler = msgHandler;
		}

		@Override
		public void onMessage(Message message) {
			if (message instanceof TextMessage) {
				TextMessage textMsg = (TextMessage) message;
				logger.info("ActiveMq接收到一个纯文本消息。");
				try {
					msgHandler.receiveMsg(textMsg);
					logger.info("\t消息内容是：" + textMsg.getText());
				} catch (JMSException e) {
					e.printStackTrace();
				}
			} else {
				// ===接收到一个非文本消息
			}
		}
	}

	// == 公共方法 =============================
	private void commit(Session session) throws JMSException {
		session.commit();
	}

	private void rollback(Session session) {
		try {
			if (session != null)
				session.rollback();
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	private synchronized void close() {
		try {
			if (null != connection.get()) {
				connection.get().close();
				connection.set(null);
			}
		} catch (Throwable ignore) {
			ignore.printStackTrace();
		}
	}

}
